#!/usr/bin/env python3
#
# Copyright 2018, Data61, CSIRO (ABN 41 687 119 230)
#
# SPDX-License-Identifier: BSD-2-Clause
#

'''
Set up and run tests for CAmkES capDL refinement proofs.
'''

import itertools
import os
import shutil
import subprocess
import sys
import tempfile
import time

# Test these apps. Commented-out tests are those that don't work
# but we have a good idea of what needs to be fixed.
test_apps = [
    'adder',
    #'aeroplage', # x86 only
    #'alignment', # x86 only
    'attributes',
    'binary-semaphore',
    'cakeml_hello',
    #'cakeml_regex', # FIXME: requires setting -DCAKEMLDIR in init-build
    'cakeml_tipc',
    'cms-donate',
    'cs-donate',
    'cs-nodonate',
    'dataport',
    #'debug-simple', # x86 only
    'dhcp',
    'dma-example',
    'epit',
    'event',
    'event-driven',
    'exchangestring',
    'filter',
    'global-imports',
    #'hellorust', # x86 only
    'hierarchical-attributes',
    'hierarchical-components',
    #'keyboard', # x86 only
    'lockserver',
    'mcs-donate',
    'mcs-nodonate',
    'multiassembly',
    'multiclient',
    #'multiplier', # camkes ADL model doesn't support int array
    'mutex',
    'periodic',
    #'pit', # x86 only
    'reversestring',
    'rotate',
    #'rumprun_ethernet', # x86 only
    #'rumprun_hello',    # x86 only
    #'rumprun_pthreads', # x86 only
    #'rumprun_rust',     # x86 only
    'serialserver_polling',
    'serialserver_interrupt',
    'serialserver_loopback',
    'simple',
    'simplesingleaddressspace',
    'socket',
    'structs',
    'swapcounter',
    'terminal',
    'testbufvariant',
    'testcamkes438',
    'testcontrolname',
    'testdataportbifurcate',
    'testdataportmux',
    'testdataportmuxflat',
    'testdataportptrwrap',
    'testdataportrpc',
    'testfaulthandlers',
    'testgrouping',
    'testgroupingcontrol',
    'testhardwareinterrupt',
    'testhwdataportlrgpages',
    'testnto1mmio',
    'testnto1overload',
    'testrefin',
    'testreplycapprotection',
    'testsel4notification',
    'testsingleaddressspaceheap',
    #'teststringarrays', # camkes ADL model doesn't support array of string
    'testsyscalls',
    'testunderscorename',
    'timeserver',
    'uart',
    #'vgatest', # x86 only
    ]

# Test these option combinations.
# The first value of each option list will be the default used in this
# test runner (once we add a command line option for that).
test_options = {
    # Having extra fault handler threads and EPs shouldn't matter,
    # because they are internal to components.
    'CAmkESFaultHandlers': ['1'],

    # This only gives each component access to its own TCB,
    # which shouldn't affect the integrity model.
    'CAmkESProvideTCBCaps': ['1'],
}

# Default app build options.
# This builds for the current verified ARM platform, sabre (on QEMU).
standard_build_config = {
    'PLATFORM': 'sabre',
    'ARM': '1',
    'CAmkESCapDLVerification': '1',
    'CAmkESCapDLStaticAlloc': '1',
}
# TODO: accept extra config on command line

# PLATFORM adjustments.
override_app_platform = {
    'serialserver_interrupt': 'exynos5410',
    'serialserver_polling': 'exynos5410',
    'serialserver_loopback': 'exynos5410',
    'uart': 'kzm',
}


# Messages
# TODO: allow adjusting verbosity
def info(msg):
    print('run_tests: info: %s' % msg)

def fatal(msg):
    print('run_tests: fatal: %s' % msg, file=sys.stderr)
    sys.exit(1)

def run_cmd(cmdline, **kwargs):
    '''Run a command, printing its output if it fails'''
    print('run_tests: command: %s' %
          ', '.join([repr(cmdline)] + ['%s=%s' % x for x in kwargs.items()]))
    start = time.time()
    try:
        return subprocess.check_output(cmdline, stderr=subprocess.STDOUT, **kwargs)
    except subprocess.CalledProcessError as exn:
        print('run_tests: command failed with code %d' % exn.returncode)
        print(exn.output.decode('utf-8'))
        raise
    finally:
        duration = time.time() - start
        if duration > 1.0:
            info('command took %.3g seconds' % duration)

class TempDir():
    '''Context manager for a temporary directory.'''
    def __init__(self, prefix=None, parent_dir=None, cleanup_on_error=True):
        self.prefix = prefix
        self.parent_dir = parent_dir
        self.cleanup_on_error = cleanup_on_error

    def __enter__(self):
        self.filename = tempfile.mkdtemp(prefix=self.prefix, dir=self.parent_dir)
        return self.filename

    def __exit__(self, exn_type, exn_val, traceback):
        if exn_type is None or self.cleanup_on_error:
            shutil.rmtree(self.filename)
        return False # propagate exceptions

# Expected paths, relative to the root of the camkes project structure
camkes_tool_rel_dir = 'projects/camkes-tool'
isabelle_rel_dir    = 'projects/l4v/isabelle'
l4v_rel_dir         = 'projects/l4v/l4v'
init_build_rel_path = 'init-build.sh'

# Main.
# TODO: argparse
def main():
    info('Testing %d apps...' % len(test_apps))

    this_script_dir = os.path.dirname(os.path.realpath(__file__))
    # Expected path to our script
    if this_script_dir.endswith('projects/camkes-tool/cdl-refine-tests'):
        # Build in the project root by default
        camkes_root = os.path.normpath(os.path.join(this_script_dir, '../../..'))
        if camkes_root != os.getcwd():
            info('Changing directory to project root: %s' % camkes_root)
            os.chdir(camkes_root)
    else:
        camkes_root = os.getcwd() # guessed

    def get_abs_path(rel_path):
        '''Resolve a path in the camkes project dir'''
        return os.path.normpath(os.path.join(camkes_root, rel_path))

    # Find the entry point to the project's build system.
    init_build_tool = get_abs_path(init_build_rel_path)
    if not os.path.exists(init_build_tool):
        fatal('init-build.sh not found at: %s' % init_build_tool)

    # Show the CAmkES version.
    try:
        camkes_tool_rev = subprocess.check_output(
            ['git', 'describe', '--tags'],
            cwd=get_abs_path(camkes_tool_rel_dir)
        ).decode('utf-8')
    except subprocess.CalledProcessError as exn:
        camkes_tool_rev = 'unknown [error: %s]' % str(exn)
    info('camkes-tool revision: %s' % camkes_tool_rev)

    # Find Isabelle.
    isabelle_dir = get_abs_path(isabelle_rel_dir)
    isabelle_tool = os.path.join(isabelle_dir, 'bin/isabelle')
    if not os.path.exists(isabelle_tool):
        fatal('''isabelle not found at: %s
(Make sure to check out the correct project manifest)''' % isabelle_tool)
    try:
        isabelle_rev = subprocess.check_output(
            ['git', 'describe', '--tags'],
            cwd=isabelle_dir
            ).decode('utf-8')
    except subprocess.CalledProcessError as exn:
        isabelle_rev = 'unknown [error: %s]' % str(exn)
    info('Isabelle revision: %s' % isabelle_rev)

    # Find l4v.
    l4v_dir = get_abs_path(l4v_rel_dir)
    if not os.path.exists(l4v_dir):
        fatal('l4v not found at: %s' % l4v_dir)
    try:
        l4v_rev = subprocess.check_output(
            ['git', 'describe', '--tags'],
            cwd=l4v_dir
            ).decode('utf-8')
    except subprocess.CalledProcessError as exn:
        l4v_rev = 'unknown [error: %s]' % str(exn)
    info('l4v revision: %s' % l4v_rev)

    # Find ninja-build.
    ninja_build_tool = 'ninja'
    try:
        ninja_version = subprocess.check_output(
            [ninja_build_tool, '--version']
            ).decode('utf-8')
        info('ninja-build version: %s' % ninja_version)
    except OSError as exn:
        fatal('''can't run ninja-build tool at: %s
[%s]''' % (ninja_build_tool, str(exn)))

    # Some of the l4v sessions rely on generated files.
    # Here, we run l4v's own build system to generate them.
    # Also pre-build the CAmkES formal model while we're at it.

    # NB: we can't use l4v/run_tests directly because it builds with
    #     slightly different Isabelle env options, causing our
    #     'isabelle build' in build_one() to miss the image cache
    info('Setting up l4v...')
    run_cmd(['./misc/regression/run_tests.py', '-v', 'CamkesCdlRefine'],
            cwd=l4v_dir)
    info('Done setting up l4v')

    # Keep track of which apps or options caused failures.
    def init_per_opt_counts():
        return dict(((k, v), 0)
                    for k, vals in
                        [('app', test_apps)] + list(test_options.items())
                    for v in vals)
    def incr_opt_counts(counts, app_name, build_opts):
        counts[('app', app_name)] += 1
        for k, v in build_opts.items():
            counts[(k, v)] += 1
    def num_counts(counts):
        return sum(count for (opt_name, opt_val), count in counts.items()
                   if opt_name == 'app')

    # Set up test counters.
    per_opt_tests  = init_per_opt_counts()
    per_opt_failed = init_per_opt_counts()
    per_opt_error  = init_per_opt_counts()

    def build_one(app_name, this_build_config):
        '''Do one test run for the given app and build options.'''
        # Use temporary directory, but in the project root.
        # Any other location should also work, but this is the
        # “standard” place for it.
        with TempDir(prefix='build-%s-' % app_name,
                     parent_dir=os.getcwd(),
                     cleanup_on_error=False
                    ) as build_dir:
            try:
                try:
                    info('Build directory: %s' % build_dir)
                    app_build_config = dict(standard_build_config)
                    if app_name in override_app_platform:
                        app_build_config['PLATFORM'] = override_app_platform[app_name]
                    build_cmdline = (
                        [init_build_tool,
                         '-DCAMKES_APP=%s' % app_name] +
                        ['-D%s=%s' % opt for opt in sorted(app_build_config.items())] +
                        ['-D%s=%s' % opt for opt in sorted(this_build_config.items())]
                        )
                    run_cmd(build_cmdline, cwd=build_dir)
                    run_cmd([ninja_build_tool], cwd=build_dir)
                    info('App build succeeded.')

                    # We expect the build system to generate theory files here
                    app_thy_dir = os.path.join(build_dir, 'cdl-refine')
                except (BaseException, Exception):
                    # failed, but not in proofs
                    incr_opt_counts(per_opt_error, app_name, this_build_config)
                    raise

                try:
                    info('Running proofs...')
                    isabelle_cmdline = [
                        'timeout', '1h',
                        isabelle_tool, 'build',
                        '-d', l4v_dir,
                        '-D', app_thy_dir,
                        '-v'
                    ]
                    run_cmd(isabelle_cmdline, cwd=build_dir)
                except Exception:
                    # proof failed
                    incr_opt_counts(per_opt_failed, app_name, this_build_config)
                    raise
                except BaseException:
                    # might be KeyboardInterrupt; not proof failure
                    incr_opt_counts(per_opt_error, app_name, this_build_config)
                    raise
            except (BaseException, Exception) as exn:
                info('Build directory retained at: %s' % build_dir)
                raise

    # Main test loop.
    try:
        for app_name in test_apps:
            for this_build_config in map(dict, itertools.product(
                    *[[(k, v) for v in vals]
                      for k, vals in sorted(test_options.items())])):
                incr_opt_counts(per_opt_tests, app_name, this_build_config)

                info('Testing app: %s' % app_name)
                info('Build config for this test:')
                if this_build_config:
                    for k, v in sorted(this_build_config.items()):
                        info('    %s=%s' % (k, v))
                else:
                    info('    (nothing)')

                try:
                    build_one(app_name, dict(this_build_config))
                    info('Test succeeded.\n\n')
                except Exception as exn:
                    info('Test failed with exception:\n  %s\n\n' % str(exn))
    except KeyboardInterrupt:
        info('Aborting tests...')

    num_tests  = num_counts(per_opt_tests)
    num_failed = num_counts(per_opt_failed)
    num_error  = num_counts(per_opt_error)
    num_passed = num_tests - num_failed - num_error
    info('Summary: %d test(s), %d passed, %d failed, %d error' %
         (num_tests, num_passed, num_failed, num_error))

    # Print details of which options are associated with failures.
    for desc, num, per_opt_counts in (
                ('Failure', num_failed, per_opt_failed),
                ('Error', num_error, per_opt_error)
            ):
        if num:
            info('%s counts:' % desc)
            counts = []
            for (k, v), tests in per_opt_counts.items():
                failed = per_opt_counts[(k, v)]
                if failed == 0:
                    continue
                counts.append((float(failed) / tests,
                               failed, tests, k, v))
            def most_failures(x):
                frac_failed, failed, tests, k, v = x
                return (-frac_failed, -failed, k) # tiebreak on k
            for frac_failed, failed, tests, k, v in sorted(counts, key=most_failures):
                info('    %d/%d when %s=%s' % (failed, tests, k, v))

    return 1 if num_failed or num_error else 0

if __name__ == '__main__':
    sys.exit(main())
